kafka提供了4类核心API
Producer API 生产消息相关接口,自定义生产者、自定义分区分配
Consumer API 消费消息相关接口,创建消费者、消费偏移量管理
Streams API 构建流处理程序的接口
Connect API kafka和外部系统进行数据流连接的连接器,实现将数据导入到kafka或从kafka中导出到外部系统。
1 | <dependencies> |
生产者
实现kafka生产者的一般步骤
创建Properties对象,设置生产者级别配置,必须包含的3个配置
bootstrap-server:配置kafka地址
key.serializer:配置用于序列化key的类
value.serializer:配置用于序列化实际数据的类
- 选填的配置(参照ProducerConfig类)
- acks ack的应答级别,0/1/-1
- retries 重试次数
- batch.size 批次大小(达到该大小才会发送)
- linger.ms 等待时间(如果达到该时间时,batch.size依然没有达到,也会进行发送)
- buffer.memory RecordAccumulator缓冲区大小,消息会组织成批次放入缓冲区中
- 根据Properties对象实例化一个KafkaProducer对象
- 实例化ProducerRecord对象,每条消息对应ProducerRecord对象
调用KafkaProducer发送消息的方法将ProducerRecord发送到kafka,有两种方法send(ProducerRecord),send(ProducerRecord,Callback)。 KafkaProducer默认是异步发送,会将消息缓存到消息缓冲区中,当消息在消息缓冲区中累计到一定数量之后作为一个RecordBatch发送。生产者发送消息分为两个阶段:第一阶段是将消息发送到消息缓冲区;第二阶段是一个Sender线程负责将缓冲区的消息发送到kafka,执行真正的I/O操作,在第一阶段执行完会返回一个Feature对象,根据对Feature对象处理方式不同,分为两种发送方式
异步方式: 使用异步方式如果希望知道消息发送成功与否,在回调函数Callback中进行相应处理
同步方式: 通过调用send方法返回的Feature对象的get()方法以阻塞式获取执行结果,即等待Sender线程处理的最终结果
- 关闭KafkaProducer,释放连接资源
单线程生产者
1 | /** |
同步方式
1 | ProducerRecord<String, String> record = null; |
异步方式(使用回调函数)
1 | ProducerRecord<String, String> record = null; |
多线程生产者
1 | public class ProducerThread implements Runnable { |